package com.wichell.framework.rocketmq.client;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import javax.annotation.PreDestroy;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wichell.framework.rocketmq.exception.RocketMQException;
import com.wichell.framework.util.JacksonUtil;

public abstract class RocketMQCommonConsumer {
	private static Logger logger = LoggerFactory.getLogger(RocketMQCommonConsumer.class);
	// **监听的服务器地址*//*
	private String namesrvAddr;
	// **消费者group*//*
	private String consumerGroup;

	/**
	 * 定义从哪个位置开始消费，从头开始还是从上次结束位置开始
	 */
	private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

	/**
	 * Backtracking consumption time with second precision. Time format is
	 * 20131223171201<br>
	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
	 * Default backtracking consumption time Half an hour ago.
	 */
	private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
	private DefaultMQPushConsumer rocketMQConsumer = null;
	// private MessageListenerConcurrently messageListenerConcurrently;

	public void startConsume(String topic, String[] tags) throws RocketMQException {
		if (StringUtils.isBlank(consumerGroup)) {
			throw new RocketMQException("consumerGroup不能为空");
		}
		if (StringUtils.isBlank(namesrvAddr)) {
			throw new RocketMQException("namesrvAddr不能为空");
		}
		if (StringUtils.isBlank(topic)) {
			throw new RocketMQException("topic不能为空");
		}
		rocketMQConsumer = new DefaultMQPushConsumer(consumerGroup);
		rocketMQConsumer.setNamesrvAddr(namesrvAddr);
		rocketMQConsumer.setConsumeFromWhere(consumeFromWhere);
		rocketMQConsumer.setConsumeTimestamp(consumeTimestamp);
		String subExpression = (tags != null && tags.length > 0) ? String.join("||", tags) : "*";
		try {
			rocketMQConsumer.subscribe(topic, subExpression);
			rocketMQConsumer.registerMessageListener(getMessageListenerConcurrently());
			/*rocketMQConsumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					context.setDelayLevelWhenNextConsume(2);// 将重试机制设置为手动
					MessageExt messageExt = msgs.get(0);
					logger.info("messageExt:" + messageExt + "---body:" + new String(messageExt.getBody()));
					Boolean processMessageResult;
					try {
						// 判断当前是否发生了消息堆积，如果发生，可以考虑丢弃消息
						*//** 本条消息的偏移量 */
			/*
			long offset = messageExt.getQueueOffset();
			*//** 当前消息队列中的最大偏移量,是存储了消息的下一个位置 */
			/*
												String maxOffset = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
												long diff = Long.parseLong(maxOffset) - offset;
												if (diff > 100000) {
												processFailCallback(messageExt.getKeys(),
												JacksonUtil.json2Map(new String(messageExt.getBody(), "UTF-8")));
												return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
												}
												processMessageResult = processMessage(messageExt.getKeys(),
												JacksonUtil.json2Map(new String(messageExt.getBody(), "UTF-8")));
												if (!processMessageResult) {
												if (messageExt.getReconsumeTimes() < 2) {
												return ConsumeConcurrentlyStatus.RECONSUME_LATER;
												} else {
												processFailCallback(messageExt.getKeys(),
												JacksonUtil.json2Map(new String(messageExt.getBody(), "UTF-8")));
												}
												}
												} catch (IOException e) {
												e.printStackTrace();
												}
												return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
												}
												});*/
			rocketMQConsumer.start();
		} catch (MQClientException e) {
			e.printStackTrace();
			throw new RocketMQException("处理消息异常,具体为：" + e.getMessage());
		}
	}

	class MessageListenerImpl implements MessageListenerConcurrently {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			context.setDelayLevelWhenNextConsume(2);// 将重试机制设置为手动
			MessageExt messageExt = msgs.get(0);
			logger.info("messageExt:" + messageExt + "---body:" + new String(messageExt.getBody()));
			Boolean processMessageResult;
			try {
				// 判断当前是否发生了消息堆积，如果发生，可以考虑丢弃消息
				/** 本条消息的偏移量 */
				long offset = messageExt.getQueueOffset();
				/** 当前消息队列中的最大偏移量,是存储了消息的下一个位置 */
				String maxOffset = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
				long diff = Long.parseLong(maxOffset) - offset;
				if (diff > 100000) {
					processFailCallback(messageExt.getKeys(),
							JacksonUtil.json2Map(new String(messageExt.getBody(), "UTF-8")));
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
				processMessageResult = processMessage(messageExt.getKeys(),
						JacksonUtil.json2Map(new String(messageExt.getBody(), "UTF-8")));
				if (!processMessageResult) {
					if (messageExt.getReconsumeTimes() < 2) {
						return ConsumeConcurrentlyStatus.RECONSUME_LATER;
					} else {
						processFailCallback(messageExt.getKeys(),
								JacksonUtil.json2Map(new String(messageExt.getBody(), "UTF-8")));
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	}

	@PreDestroy
	public void shutdown() {
		if (rocketMQConsumer != null) {
			rocketMQConsumer.shutdown();
		}
	}

	public String getNamesrvAddr() {
		return namesrvAddr;
	}

	public void setNamesrvAddr(String namesrvAddr) {
		this.namesrvAddr = namesrvAddr;
	}

	public String getConsumerGroup() {
		return consumerGroup;
	}

	public void setConsumerGroup(String consumerGroup) {
		this.consumerGroup = consumerGroup;
	}

	public ConsumeFromWhere getConsumeFromWhere() {
		return consumeFromWhere;
	}

	public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
		this.consumeFromWhere = consumeFromWhere;
	}

	public String getConsumeTimestamp() {
		return consumeTimestamp;
	}

	public void setConsumeTimestamp(String consumeTimestamp) {
		this.consumeTimestamp = consumeTimestamp;
	}

	/*	public MessageListenerConcurrently getMessageListenerConcurrently() {
			return messageListenerConcurrently;
		}*/

	public MessageListenerConcurrently getMessageListenerConcurrently() {
		return new MessageListenerImpl();
	}

	/**
	 * 如何处理消息
	 * 
	 * @param keys
	 * @param bodyMap
	 * @return
	 */
	public abstract boolean processMessage(String keys, Map<String, Object> bodyMap);

	/**
	 * 消息处理失败
	 * 
	 * @param keys
	 * @param bodyMap
	 */
	public abstract void processFailCallback(String keys, Map<String, Object> bodyMap);

}
